Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

break nested classes out of StreamingDataflowWorker #28537

Merged
merged 1 commit into from
Sep 28, 2023

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Sep 19, 2023

StreamingDataflowWorker file is quite large. Opting to break out the inner static classes to a top level and introduce /streaming directory under the worker/ directory.

There will be future changes to these classes to support direct path, and this change will make the files easier to work with.

includes some formatting changes required by spotless

Slowly switching to java.util.Optional instead of the google Optional.

r: @kennknowles

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@kennknowles
Copy link
Member

Can you separate the independent bits into separate commits? (can still be one PR)

@m-trieu
Copy link
Contributor Author

m-trieu commented Sep 19, 2023

@kennknowles I split the commits, will do this ahead next time. Thanks!

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @AnandInguva added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

return Transport.getJsonFactory().fromString(input, MapTask.class);
}

public static void main(String[] args) throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles out of curiosity, do you happen to know where this is referenced? I was looking for it in a gradle file, but couldn't find it and not sure if its even referenced in those.

Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is referenced in dataflow internal launching of the Dataflow v1 Java worker (not in github repo).
With beam portability, the runner specifics like this are behind the FnApi but for the v1 harness the dataflow details leaked some. They were initially separate repositories but we kept having jar compatibility issues so we moved it to the external beam repository and single jar.

@m-trieu
Copy link
Contributor Author

m-trieu commented Sep 20, 2023

r: @scwhittle

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

import org.checkerframework.checker.nullness.qual.Nullable;

/** Bounded set of queues, with a maximum total weight. */
public class WeightedBoundedQueue<V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to add some tests for this now that it's visible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}

/** Returns the current weight of the queue. */
public int weight() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this method name and size() aren't particularly obvious, what do you think?

maybe
queuedElementsWeight()
queuedElementsCount()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

? ImmutableMap.copyOf(transformUserNameToStateFamily)
: ImmutableMap.of();
this.computationStateCache = computationStateCache;
Preconditions.checkNotNull(mapTask.getStageName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto (also if kept put the preconditions at the top?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved, mapTask is not nullable, but unfortunately systemName and stageName fields in map task are

this.mapTask = mapTask;
this.executor = executor;
this.transformUserNameToStateFamily =
transformUserNameToStateFamily != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @ Nullable annotation or remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return executionStateQueue;
}

/** Mark the given shardedKey and work as active. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and schedules execution of work if there is no active work for the shardedKey already processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB
static final int NUM_COMMIT_STREAMS = 1;
static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the other constants be moved to the top here?
(if not, what is the motivation for moving some of them?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think either the google java linter style or the spotless apply linter organizes constants in public, protected package private, then private order.

this was applied by spotless apply or google java formatter

}

private static final Random clientIdGenerator = new Random();
final WindmillStateCache stateCache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be private final? and stay in same location?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is accessed in the StreamingDataflowWorkerTest.testMergeWindowsCaching, i can mark it as @VisibleForTesting?

I plan to do some breakup/cleanup of StreamingDataflowWorker in a later CL. If we opt to use dependency injection and inject the cache for tests, we can make all of this private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking we can eventually have either StreamingApplianceDataflowWorker or StreamingEngineDataflowWorker (depending on the StreamingDataflowWorkerOptions). StreamingDataflowWorker can instantiate either one depending on the passed in options, and we can have different tests for each.

We can inject all the dependencies into the constructors and have some static factory class/methods to create the shapes we want (for test or not etc).

Future code changes won't ever touch the explicit code paths/classes for appliance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also for my own knowledge, why don't we use Guice or Dagger dependency injection (if we want to avoid the runtime overhead)? @scwhittle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles
I'm not very aware of Java ecosystem and didn't originally write these files. I'm not sure if either of those frameworks are used in Beam but I don't know if that was a conscious choice.

// is enabled.
private ScheduledExecutorService globalConfigRefreshTimer;
// Periodic sender of debug information to the debug capture service.
private DebugCapture.Manager debugCaptureManager = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove = null, mark it nullable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

workIdBuilder.append('-');
workIdBuilder.append(Long.toHexString(workItem.getWorkToken()));
DataflowWorkerLoggingMDC.setWorkId(workIdBuilder.toString());
String workIdBuilder =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer a builder. Does Java optimize repeated string additions better now? IIRC we maybe saw cpu or objects on a profile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe StringBuilder is very useful when concatenating in a loop, for simple concatenation the compiler will optimize for us. https://dzone.com/articles/string-concatenation-performacne-improvement-in-ja

@@ -1393,7 +1367,7 @@ private Commit batchCommitsToStream(CommitWorkStream commitStream) {
Commit commit;
try {
if (commits < 5) {
commit = commitQueue.poll(10 - 2 * commits, TimeUnit.MILLISECONDS);
commit = commitQueue.poll(10 - 2L * commits, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is L required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid integer multiplication implicitly cast to long (complaint from the static checker)

@m-trieu m-trieu force-pushed the mtrieu-refactor branch 2 times, most recently from b8229e0 to 766b440 Compare September 21, 2023 22:35
}

private static final Random clientIdGenerator = new Random();
final WindmillStateCache stateCache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles
I'm not very aware of Java ecosystem and didn't originally write these files. I'm not sure if either of those frameworks are used in Beam but I don't know if that was a conscious choice.

assertEquals(MAX_WEIGHT, queue.queuedElementsWeight());
assertEquals(1, queue.size());

// Have another thread poll the queue, pulling off the only value inside and freeing up the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just poll directly from this thread instead of via a new thread, easier to verify result etc then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));

// Insert value that takes all the capacity into the queue.
Thread thread1 = new Thread(() -> queue.put(MAX_WEIGHT));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just put directly from the test thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

// Try to insert another value into the queue. This will block since there is no capacity in the
// queue.
Thread thread2 = new Thread(() -> queue.put(MAX_WEIGHT));
thread2.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sleep 100ms? just to give some time for thread to get into blocking put (test will pass either way so shouldn't add flakiness)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

WeightedBoundedQueue<Integer> queue =
WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));

assertNull(queue.poll());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add test of poll with timeout method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

throw new RuntimeException(e);
}
});
takeThread.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a sleep to give chance for it to start blocking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}

@Test
public void testTake() throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a variant where you poll with very large timeout and ensure it gets the element

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done used polling at 1 minute, inserting element at 30 seconds, so poll should return the element.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I just meant something like testPoll_withTimeout with a large timeout we never expected to get hit. You could remove testPoll_withLargeTimeout since I don't think it adds much beyond withTimeout and theses tests don't seem to run with much parallelism and we don't want to slow the build down too much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg removed!

* Marks the work for the given shardedKey as complete. Schedules queued work for the key if any.
*/
public void completeWork(ShardedKey shardedKey, long workToken) {
Work nextWork;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping on this and some other uncompleted comments

@@ -125,23 +126,28 @@ public boolean activateWork(ShardedKey shardedKey, Work work) {
/**
* Marks the work for the given shardedKey as complete. Schedules queued work for the key if any.
*/
public void completeWork(ShardedKey shardedKey, long workToken) {
Work nextWork;
public void completeWorkAndScheduleNextWork(ShardedKey shardedKey, long workToken) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe CompleteWorkAndScheduleNextWorkForKey

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

synchronized (activeWork) {
Queue<Work> queue = activeWork.get(shardedKey);
if (queue == null) {
Queue<Work> workQueue = activeWork.get(shardedKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nullable annotation or optional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made optional and moved into ActiveWorkState + tests

}

@Test
public void testTake() throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I just meant something like testPoll_withTimeout with a large timeout we never expected to get hit. You could remove testPoll_withLargeTimeout since I don't think it adds much beyond withTimeout and theses tests don't seem to run with much parallelism and we don't want to slow the build down too much.

});
takeThread.start();

Thread putThread =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove putThread, can just be inlined to test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -31,7 +31,10 @@ public abstract class NameContext {
* systemName} and a {@code userName}.
*/
public static NameContext create(
String stageName, String originalName, String systemName, String userName) {
String stageName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping, I realized this is just method and not members generated by autovalue below. But can stageName be non-null below?


@Override
public void close() throws Exception {
ExecutionState executionState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping


import org.apache.beam.runners.dataflow.worker.windmill.Windmill;

public class KeyCommitTooLargeException extends Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

if (keyToDisplay.size() > 100) {
keyToDisplay = keyToDisplay.substring(0, 100);
}
return String.format("%016x-%s", shardingKey(), TextFormat.escapeBytes(keyToDisplay));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

return counterUpdates;
}

public String getStageName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

"Active key %s without work, expected token %d",
shardedKey, workToken)));

if (completedWork.getWorkItem().getWorkToken() != workToken) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be comparing the workToken and cacheToken

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline, will follow up in next cl


// Ensure we don't already have this work token queued.
for (Work queuedWork : workQueue) {
if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the cache token is different then it isn't a pure duplicate it could be a retry, in that case it might be better to take the more recent observed item (guessing it is more likely the newer one) or keep both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm, I don't think this is currently done.

do we want to queue the retry then? What happens if we process both, would Windmill make sure only 1 is committed w/ the exactly once processing?

in more detail if we take the most recent observed item, then would we have to remove the older Work(sameWorkToken, existingCacheToken) from the queue, and add the more recent or passed in Work(sameWorkToken, newCacheToken) to the queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline, will follow up in next cl

.build());
}

synchronized CommitsPendingCountAndActiveWorkStatus getPendingCommitsAndPrintActiveWorkAt(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about just changing this to printTo(PrintWriter writer) and doing the printing here?
Or just returning a String that we print?

I don't think we need to expose this for testing (don't see test anyway yet) and the current logic is a bit spread out with MAX_PRINTABLE_COMMIT_PENDING_KEYS used here when building and in printing. The header for the table is also separated from the population of the table here. Just having all of that in a single method instead of introducing CommitsPendingCountAndActiveWorkStatus seems cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

execute(work);
return true;
}
// This will never happen, the switch is exhaustive.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move inside default case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -120,6 +120,11 @@ public Collection<Windmill.LatencyAttribution> getLatencyAttributions() {
return list;
}

boolean isStuckAt(Instant stuckCommitDeadline) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isStuckCommittingAt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}

@Test
public void testActivateWorkForKey_EXECUTE_emptyWorkQueueForKey() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems odd to me because I don't think this is a legal state for the class. If somethig is active it is in the queue, if it becomes active and there is nothing it should clean itself up.

It seems like it would be better to test through the interface of the class instead of injecting internal state members, maybe verifying internal state but not directly manipulating it from the test.

For example here you coudl do the above, add an item, activate it, complete it, verify that the key doesn't exist in the internal map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done im also going to remove all of the tests that we can't "simulate" with the external API. changes the test map to readonly as well.

ShardedKey shardedKey = shardedKey("someKey", 1L);
Deque<Work> workQueue = new ArrayDeque<>();
workQueue.addLast(workInQueue);
activeWork.put(shardedKey, workQueue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, this seems like it should just be combined with the above QUEUED test which sets up the right state, instead of hard-coding the state here which might not match the actual implementation.

If there was a bug and queueing the work returned QUEUED but didn't actually put it in the internal state, then the above test would pass and this test would pass.

Ditto for the below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, minor test improvement but otherwise feel free to ask someone else for review/merge.

shardedKey, workToBeCompleted.getWorkItem().getWorkToken());

assertTrue(nextWorkOpt.isPresent());
assertSame(nextWork, nextWorkOpt.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to then complete this returned work and verify that works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Use ActiveWorkState class instead of an activeWorkMap in ComputationState
@m-trieu
Copy link
Contributor Author

m-trieu commented Sep 27, 2023

Thanks for reviewing @scwhittle @kennknowles !
@AnandInguva I squashed the commits, and this is ready for merge.

Thank you!

@AnandInguva AnandInguva merged commit 170310e into apache:master Sep 28, 2023
22 checks passed
@m-trieu m-trieu deleted the mtrieu-refactor branch June 10, 2024 15:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants